package com.xiaofan

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

import java.util

/**
 * 输入输出样例类型
 */
case class OrderEvent(orderId: Long, eventType: String, txId: String, timestamp: Long)

case class OrderResult(orderId: Long, resultMsg: String)


object OrderTimeout {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
    val orderEventStream: DataStream[OrderEvent] = inputStream
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          OrderEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
        }
      }
      .assignAscendingTimestamps(_.timestamp * 1000L)
      .keyBy(_.orderId)

    // 1.定义一个Pattern
    val orderPayPattern: Pattern[OrderEvent, OrderEvent] = Pattern
      .begin[OrderEvent]("create").where(_.eventType == "create")
      .followedBy("pay").where(_.eventType == "pay")
      .within(Time.minutes(15))

    // 2.将pattern应用到数据流上，进行模式测试
    val patternStream: PatternStream[OrderEvent] = CEP.pattern(orderEventStream, orderPayPattern)

    // 3. 定义测输出流标签， 用于处理超市事件
    val orderTimeoutTag = new OutputTag[OrderResult]("orderTimeout")

    // 4. 调用select方法，提取饼处理匹配成功的支付事件和超时事件
    val resultStream: DataStream[OrderResult] = patternStream.select(orderTimeoutTag, new OrderTimeoutSelect(), new OrderPaySelect())

    resultStream.print("payed")
    resultStream.getSideOutput(orderTimeoutTag).print("timeout")

    env.execute("order time job")
  }
}

// 实现自定义的PatternTimeoutFunction以及PatternSelectFunction
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult] {
  override def timeout(pattern: util.Map[String, util.List[OrderEvent]], timeoutTimestamp: Long): OrderResult = {
    val timeoutOrderId: Long = pattern.get("create").iterator().next().orderId
    OrderResult(timeoutOrderId, "timeout " + ": +" + timeoutTimestamp)
  }
}

class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult] {
  override def select(pattern: util.Map[String, util.List[OrderEvent]]): OrderResult = {
    val payedOrderId = pattern.get("pay").iterator().next().orderId
    OrderResult(payedOrderId, "pay successfully!")
  }
}
























